package com.meicai.data.engine.distribute;

import com.meicai.data.pojo.SkuItem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Partitioner;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * hadoop二次排序用到的相关类
 */
public class SecondarySortApi {

    /**
     * 对sku+operate_tiem+objectId进行二次排序, 由于如分拣可能同个时间产生多个oi,时间一样，所以用objectId再进行排序
     */
    public static class SkuOutputKey implements Writable, WritableComparable<SkuOutputKey> {

        private Text skuKey = new Text();
        private LongWritable operateTime = new LongWritable();
        private LongWritable id = new LongWritable();
        //默认构造方法一定要写
        public SkuOutputKey(){}
        public SkuOutputKey(SkuItem sku, long operateTime, long id){
            this.skuKey.set(sku.toString());
            this.operateTime.set(operateTime);
            this.id.set(id);
        }

        @Override
        public int hashCode(){
            return this.skuKey.hashCode() * 163 + this.operateTime.hashCode() * 163
                    + this.id.hashCode() * 163;
        }

        @Override
        public boolean equals(Object obj){
            if(obj instanceof SkuOutputKey){
                SkuOutputKey mo = (SkuOutputKey)obj;
                return this.skuKey.equals(mo.skuKey) && this.operateTime.equals(mo.operateTime)
                        && this.id.equals(mo.id);
            }
            return false;
        }

        @Override
        public int compareTo(SkuOutputKey o) {
            int comValue = this.skuKey.compareTo(o.skuKey);
            if(comValue == 0){
                comValue = this.operateTime.compareTo(o.operateTime);
                if(comValue == 0){
                    comValue = this.id.compareTo(o.id);
                }
            }
            return comValue;
        }

        @Override
        public void write(DataOutput dataOutput) throws IOException {
            this.skuKey.write(dataOutput);
            this.operateTime.write(dataOutput);
            this.id.write(dataOutput);
        }

        @Override
        public void readFields(DataInput dataInput) throws IOException {
            this.skuKey.readFields(dataInput);
            this.operateTime.readFields(dataInput);
            this.id.readFields(dataInput);
        }

        public Text getSkuKey() {
            return skuKey;
        }

        public SkuItem toSkuItem(){
            String items[] = this.skuKey.toString().split("_");
            return new SkuItem(items[0], items[1], items[2], items[3]);
        }

    }

    public static class SkuPartitioner extends Partitioner<SkuOutputKey, Text> {
        @Override
        public int getPartition(SkuOutputKey skuOutputKey, Text text, int i) {
            return Math.abs(skuOutputKey.getSkuKey().hashCode()) % i;
        }
    }

    public static class SkuGroupingComparator extends WritableComparator {
        public SkuGroupingComparator(){
            super(SkuOutputKey.class, true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b){
            SkuOutputKey k1 = (SkuOutputKey)a;
            SkuOutputKey k2 = (SkuOutputKey)b;
            return k1.getSkuKey().compareTo(k2.getSkuKey());
        }
    }
}
